/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.util.concurrent;

import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 单线程单例 {@link EventExecutor}.
 * <p>
 * 它自动启动线程, 并且当任务队列中没有待处理任务超过 1 秒时, 停止线程.
 * 请注意: 为该执行器安排大量任务是不可扩展的, 此时请使用专门的执行器.
 */
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    // 全局实例
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);

    // SCHEDULE_QUIET_PERIOD_INTERVAL 必须在 INSTANCE 之前, 否则因顺序问题将导致初始化出问题
    // private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
    public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();

    // 任务队列
    final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
    // 空闲任务
    final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
            this,
            Executors.<Void>callable(new Runnable() {
                @Override
                public void run() {
                    // NOOP
                }
            }, null),
            ScheduledFutureTask.deadlineNanos(TimeUnit.SECONDS.toNanos(1)),
            -TimeUnit.SECONDS.toNanos(1)
    );

    // 因 GlobalEventExecutor 是单例, 提交到它的任务可以来自于任意线程, 这将触及创建任意线程组的线程.
    // 处于该原因, thread factory 不能保持对其线程组的粘性.
    // visible for testing
    final ThreadFactory threadFactory;
    private final TaskRunner taskRunner = new TaskRunner();
    // 线程是否已启动
    private final AtomicBoolean started = new AtomicBoolean();
    private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
    // 当前执行线程
    volatile Thread thread;

    private GlobalEventExecutor() {
        scheduledTaskQueue().add(quietPeriodTask);
        threadFactory = ThreadExecutorMap.apply(
                new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null),
                this
        );
    }

    /**
     * 从任务队列中取出下一 {@link Runnable}, 若当前无任务将阻塞.
     *
     * @return {@code null} 如果该 executor 线程已经被中断或被唤醒(waken up)
     */
    Runnable takeTask() {
        BlockingQueue<Runnable> taskQueue = this.taskQueue;
        for (; ; ) {
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            // scheduledTaskQueue 中无任务, 从 taskQueue 中获取任务
            if (scheduledTask == null) {
                Runnable task = null;
                try {
                    task = taskQueue.take();
                } catch (InterruptedException e) {
                    // Ignore
                }
                return task;
            }
            // scheduledTaskQueue 中有任务
            else {
                long delayNanos = scheduledTask.delayNanos();
                Runnable task;
                // scheduledTaskQueue 有 delay
                if (delayNanos > 0) {
                    try {
                        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        // Waken up.
                        return null;
                    }
                }
                // scheduledTaskQueue 没有 delay
                else {
                    task = taskQueue.poll();
                }

                // task 依旧为空, 尝试检查 scheduledTaskQueue 并等待 taskQueue 不为空
                if (task == null) {
                    fetchFromScheduledTaskQueue();
                    task = taskQueue.poll();
                }

                if (task != null) {
                    return task;
                }
            }
        }
    }

    /**
     * 将 scheduledTaskQueue 中就绪的任务移动至 taskQueue
     */
    private void fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            taskQueue.add(scheduledTask);
            scheduledTask = pollScheduledTask(nanoTime);
        }
    }

    /**
     * 返回待处理任务的数量.
     *
     * <strong>
     * 请注意: 该操作代价可能是昂贵的, 具体取决于 SingleThreadEventExecutor 内部实现, 请谨慎使用.
     * </strong>
     */
    public int pendingTasks() {
        return taskQueue.size();
    }

    /**
     * 添加一个任务到 taskQueue, 如果该实例之前已经被关闭, 则将抛出 {@link RejectedExecutionException}.
     */
    private void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        taskQueue.add(task);
    }

    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        return terminationFuture();
    }

    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

    @Override
    @Deprecated
    public void shutdown() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isShuttingDown() {
        return false;
    }

    @Override
    public boolean isShutdown() {
        return false;
    }

    @Override
    public boolean isTerminated() {
        return false;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) {
        return false;
    }

    /**
     * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
     * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
     * when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
     * down and there's no chance of submitting a new task afterwards.
     *
     * @return {@code true} if and only if the worker thread has been terminated
     */
    public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        final Thread thread = this.thread;
        if (thread == null) {
            throw new IllegalStateException("thread was not started");
        }
        thread.join(unit.toMillis(timeout));
        return !thread.isAlive();
    }

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        addTask(task);
        if (!inEventLoop()) {
            startThread();
        }
    }

    private void startThread() {
        if (started.compareAndSet(false, true)) {
            final Thread t = threadFactory.newThread(taskRunner);
            // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
            // classloader.
            // See:
            // - https://github.com/netty/netty/issues/7290
            // - https://bugs.openjdk.java.net/browse/JDK-7008595
            AccessController.doPrivileged(new PrivilegedAction<Void>() {
                @Override
                public Void run() {
                    t.setContextClassLoader(null);
                    return null;
                }
            });

            // 在该线程启动前, 设置 thread 值, 否则 inEventLoop() 可能返回 false, 从而产生一个 assert error.
            // See https://github.com/netty/netty/issues/4357
            thread = t;
            t.start();
        }
    }

    final class TaskRunner implements Runnable {
        @Override
        public void run() {
            for (; ; ) {
                Runnable task = takeTask();
                if (task != null) {
                    try {
                        task.run();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from the global event executor: ", t);
                    }

                    if (task != quietPeriodTask) {
                        continue;
                    }
                }

                Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
                // 如果队列中无任务(除了 noop 任务, 即 quietPeriodTask), 终止线程
                if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                    // 标记当前线程为已停止
                    // 下列 CAS 必须始终成功并且必须是无竞争的, 因为同一时间应该只有一个线程在运行.
                    boolean stopped = started.compareAndSet(true, false);
                    assert stopped;

                    // 在我们做上述 CAS 操作时, 是否通过 execute() or schedule*() 添加了新任务
                    if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                        // A) 没有添加新任务: 终止是安全的，因为没有什么可做的
                        // B) 一个新线程已经启动, 并且开始处理新任务: 终止是安全的，新线程将处理其余部分
                        break;
                    }

                    // 有新任务被添加
                    // 新线程已经标记状态为启动(线程实际可能尚未启动)
                    if (!started.compareAndSet(false, true)) {
                        // startThread() 启动了一个新线程, 并将 'started' 设置为 true.
                        // 终止该线程, 以便新线程以独占方式从 taskQueue 中读取
                        break;
                    }

                    // 新线程尚未标记状态为启动
                    // 此处 worker 可以更快设置 'started' 为 true(线程实际可能尚未启动)
                    // 保持此处线程存活, 并继续处理新添加的任务
                }
            }
        }
    }
}
